Source code for qtealeaves.observables.custom_correlation

# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
Custom observable
"""

# pylint: disable=too-many-locals
# pylint: disable=too-many-instance-attributes

import numpy as np

from qtealeaves.mpos import ITPO, DenseMPO, DenseMPOList, MPOSite
from qtealeaves.tooling import QTeaLeavesError, map_selector

from .tnobase import _TNObsBase

__all__ = ["TNObsCustom"]


[docs] class TNObsCustom(_TNObsBase): """ The custom observable measures the term of arbitrary size over the arbitrary positions. Thus, the observable is of type ``<A_i B_j ... C_k>``. The results are stored in 1D array, with the order as provided in input. All operators in the measured n-body term must be placed on different sites, i != j != ... != k". If the system is 2d or 3d, user can provide the positions of measurements either in the already flattened, i.e. 1d indexing, or as 2d/3d indexing. In the latter case, additional arguments (see below) must be provided. Arguments --------- name : str Define a label under which we can find the observable in the result dictionary. operators : list (of length term_size) of strings Identifiers/strings for the operators to be measured. site_indices : list of lists On which sites we want to measure specified operators. E.g. if we want to measure 2-body correlators on sites [1,2] and [3,4], `site_indices=[[1,2],[3,4]]`. REMARK: counting starts at 0. batch_size : int | None, optional None measures with a single iTPO. Any integer will be interpreted as batch size after every batch_size entries a new iTPO is created. The batch size addresses the problem of memory needs of the iTPO correlation measurement with all terms at once; lower batch sizes reduce the memory cost and increasing the computational cost to some extent. Default to None (Measure all correlations in one iTPO). **If working with 2d/3d systems:** dim : 1, 2, or 3, optional Dimensionality of the lattice (1d, 2d, 3d) Default is 1. mapping : string or instance of :py:class:`HilbertCurveMap`, optional If dim != 1, which 2d/3d to 1d mapping to use. Possible inputs are: 'HilbertCurveMap', 'SnakeMap', and 'ZigZagMap'. Default is 'HilbertCurveMap'. lattice : list, optional If working with 2d or 3d systems, a lattice size must be given to compute the corresponding mapping to 1d. """ # pylint: disable-next=too-many-arguments def __init__( self, name, operators, site_indices, batch_size=None, dim=1, mapping="HilbertCurveMap", lattice=None, ): super().__init__(name) self.measurable_ansaetze = ("MPS", "TTN", "TTO", "ATTN") if operators is not None: # how many sites per observable term, e.g. two-body or three-body term. self.term_size = [len(operators)] # max term size if there are more than one custom observable # defined in the simulation - look at __iadd__ function self.max_term_size = self.term_size[0] # run the input checks if self.term_size[0] == 1: raise ValueError( "We do not measure local terms as a custom correlations." ) else: self.term_size = None self.max_term_size = None self.operators = [operators] self.batch_size = [batch_size] # handling 2D and 3D if (dim is not None) and (site_indices is not None): # run the input checks if (dim > 1) and (lattice is None): raise ValueError( "If lattice dimensionality is >1, the lattice size must " "be given as the input." ) if isinstance(site_indices[0][0], list) and (dim == 1): dim_sites = len(site_indices[0][0]) raise ValueError( f"Dimension of the given site indices input, dim = {dim_sites}, " "does not correspond to the 1d system." ) if dim > 1: # if system dimensionality is 2d or 3d, the site indices array needs # to be flattened to 1d site_indices = self.get_sites_1d(site_indices, dim, lattice, mapping) # check if operators match number of specified sites if site_indices is not None: if any(len(inds) != len(operators) for inds in site_indices): raise ValueError( "Must provide one operator per site in measurement positions." ) self.site_indices = site_indices
[docs] @classmethod def empty(cls): """ Documentation see :func:`_TNObsBase.empty`. """ obj = cls(None, None, None) obj.name = [] obj.term_size = [] obj.operators = [] obj.site_indices = [] obj.batch_size = [] return obj
def __len__(self): """ Provide appropriate length method. """ return len(self.name) def __iadd__(self, other): """ Documentation see :func:`_TNObsBase.__iadd__`. """ if isinstance(other, TNObsCustom): self.name += other.name if self.max_term_size is not None: if other.term_size[0] > self.max_term_size: self.max_term_size = other.term_size[0] else: self.max_term_size = other.term_size[0] self.term_size += other.term_size self.operators += other.operators self.batch_size += other.batch_size self.site_indices.append(other.site_indices) else: raise QTeaLeavesError("__iadd__ not defined for these types.") return self
[docs] def collect_operators(self): """ Documentation see :func:`_TNObsBase.collect_operators`. In the case of n-body operators with n>2, we choose that the bulk operators have label 'r'. However, note that this has limitations if the symmetry number changes. """ for elem in self.operators: yield (elem[0], "l") for subelem in elem[1:-1]: yield (subelem, "r") yield (elem[-1], "r")
[docs] def to_itpo(self, operators, tensor_backend, num_sites): """ Return an ITPO representing the custom correlation observable. Since custom corr don't handle diagonal terms, the function is same for TTN and aTTN. Parameters ---------- operators: TNOperators The operator class tensor_backend: instance of `TensorBackend` Tensor backend of the simulation num_sites: int Number of sites of the state Returns ------- ITPO The ITPO class """ dense_mpo_list = DenseMPOList() for kk, _ in enumerate(self.name): num_meas = len(self.site_indices[kk]) for ii in range(num_meas): position = self.site_indices[kk][ii] if len(position) != len(np.unique(np.array(position))): raise ValueError( "For now cannot measure local (diagonal) terms with " "custom correlations. All sites within a term must " "be different." ) mpo_sites = [] for jj, key_op in enumerate(self.operators[kk]): site = MPOSite( position[jj], key_op, 1.0, 1.0, operators=operators, params={} ) mpo_sites.append(site) dense_mpo = DenseMPO(mpo_sites, tensor_backend=tensor_backend) dense_mpo_list.append(dense_mpo) # There are two conditions under which we want to yield: # 1) If the batch_size is filled. # 2) If batch_size is specified, and we are in the last point of the iteration. # The second enforces that each custom correlation is measured separately. # We assume this makes sense if the user asks for batches. condition1 = len(dense_mpo_list) == self.batch_size[kk] condition2 = self.batch_size[kk] is not None and ii == num_meas - 1 if condition1 or condition2: dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) yield itpo # Start with new empty dense_mpo_list = DenseMPOList() if len(dense_mpo_list) == 0: # We finished exactly on the last term return # Sites are not ordered and we have to make links match anyways dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) yield itpo
[docs] def get_sites_1d(self, site_indices, dim, lattice, mapping): """ In the case of 2D, or 3D input of the measurement position sites, give back the flattened (mapped to 1d) array for measurement position sites. Parameters ---------- site_indices : list of measurement positions On which sites we want to measure specified operators. REMARK: counting starts at 0. dim : 2 or 3 Dimensionality of the lattice (2d, 3d) lattice : list, optional Lattice size. mapping : string or instance of :py:class:`HilbertCurveMap` If dim != 1, which 2d/3d to 1d mapping to use. Possible inputs are: 'HilbertCurveMap', 'SnakeMap', and 'ZigZagMap'. """ # get the mapping map_indices = map_selector(dim, lattice, mapping) # define the new flattened site index list site_indices_flattened = [] for sites in site_indices: # get the sites for each subterm sites_ii = [] for ii in range(self.term_size[0]): sites_ii.append(map_indices[tuple(sites[ii])]) site_indices_flattened.append(sites_ii) return site_indices_flattened
[docs] def read(self, fh, **kwargs): """ Read the measurements of the correlation observable from fortran. **Arguments** fh : filehandle Read the information about the measurements from this filehandle. """ # First line is separator _ = fh.readline() is_meas = fh.readline().replace("\n", "").replace(" ", "") is_measured = is_meas == "T" ii = 0 for elem in self.name: if is_measured: # get how many subterms are measured within each measurement num_sites_measured = len(self.site_indices[ii]) # check if results are real or complex realcompl = fh.readline() if "C" in realcompl: # first read the results as list of strings str_values_r = [] str_values_c = [] for _ in range(num_sites_measured): str_values_r.append(fh.readline()) str_values_c.append(fh.readline()) # convert back to float values real_val = np.array(list(map(float, str_values_r))) compl_val = np.array(list(map(float, str_values_c))) # combine them into the complex array res = real_val + 1j * compl_val yield elem, res if "R" in realcompl: # first read the results as list of strings str_values_r = [] for _ in range(num_sites_measured): str_values_r.append(fh.readline()) # convert back to float values res = np.array(list(map(float, str_values_r))) yield elem, res ii += 1 else: yield elem, None
[docs] def write_results(self, fh, state_ansatz, **kwargs): """ See :func:`_TNObsBase.write_results`. """ is_measured = self.check_measurable(state_ansatz) # Write separator first fh.write("-" * 20 + "tnobscustom\n") # Assignment for the linter _ = fh.write("T \n") if is_measured else fh.write("F \n") if is_measured: for name_ii in self.name: # get the real and imaginary part of results array res = self.results_buffer[name_ii] res_real = np.real(res) res_imag = np.imag(res) num_sites_measured = len(res_real) # write if is complex if np.any(np.abs(res_imag) > 1e-12): # results are imaginary fh.write("C\n") # write both real and imaginary results term by term for ii in range(num_sites_measured): str_r = str(res_real[ii]) str_i = str(res_imag[ii]) fh.write(str_r + "\n") fh.write(str_i + "\n") else: # results are real fh.write("R\n") # write just the real parts for ii in range(num_sites_measured): str_r = str(res_real[ii]) fh.write(str_r + "\n") self.results_buffer = {}